package AkkaD

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._

//改造第六步:在Worker中创造一个主构造器,用来传递localhost和port,另外还传递消息需要包含的memory和cpu核数
class Worker(val masterHost:String,val masterPost:Int,val memory:Int,val cores:Int)  extends Actor{

  //改造第九步:使用java的UUID当做改造第八步中的WorkID作为参数
  val WORKER_ID = UUID.randomUUID().toString

  //更新:记录Worker从节点的发送心跳的时间间隔
  val HEARTBEAT_INTERVAL = 10000

  var masterUrl: String = _

  //更新:
  var master : ActorSelection = _

  //第六步:在Actor的构造方法之后,并Receive执行之前,先执行一次preStart()
  //相当于servlet中的init()
  override def preStart(): Unit = {

    //改造第五步:在Worker中获取Master的抽象变量
    //与Master建立连接,并通信(发消息),使用的是actorSelection(),此时拿到了一个Master的代理对象
      val master: ActorSelection = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPost/user/Master")
    //第七步:Worker向Master发消息
    //改造第八步:Worker向Master发送封装好的注册消息,此注册消息包括(WorkID,内存,cpu核数等信息)
    master ! RegisterWorker(WORKER_ID,memory,cores)
  }

  //Receive方法反复执行
  override def receive:Receive = {

//    case "Response" =>{
//      println("a response from Master")
//    }

    //更新:Master对注册的Worker存储注册之后需要向Worker发送消息,来说明Worker注册成功
    //此处是Worker节点接收到Master节点的反馈信息
    case RegisteredWorker(masterUrl) =>{
      this.masterUrl = masterUrl
      println("yanxin:"+masterUrl)
      //更新:启动定时器发送心跳
      import context.dispatcher

      context.system.scheduler.schedule(0 millis,HEARTBEAT_INTERVAL millis,self,SendHeartBeat)
    }

      //Worker先向自己self发送一下心跳
    case SendHeartBeat => {
      master ! HeartBeat(WORKER_ID)
    }

  }
}

object Worker {

  val WORKER_ACTOR_SYSTEM_NAME = "WorkerSystem"
  val WORKER_ACTOR_NAME = "Worker"

  def main(args: Array[String]): Unit = {

    //改造第四步:抽取变量
    val workerHost = args(0)
    val workerPort = args(1).toInt
    val masterHost = args(2)
    val masterPost = args(3).toInt

    val workerMemory = args(4).toInt
    val workerCores = args(5).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$workerHost"
         |akka.remote.netty.tcp.port = "$workerPort"
      """.stripMargin

    val config: Config = ConfigFactory.parseString(configStr)
    //第四步:创建Worker的ActorSystem
    val actorSystem = ActorSystem(WORKER_ACTOR_SYSTEM_NAME,config)
    //第五步:创建Actor
    //改造第七步:创建Worker构造器的时候,将改造第六步中的参数传入,这样Worker中的preStart方法就可以使用参数了
    actorSystem.actorOf(Props(new Worker(masterHost,masterPost,workerMemory,workerCores)),WORKER_ACTOR_NAME)

  }
}
